#include "topology.hpp"

#if HYRISE_NUMA_SUPPORT
#include <numa.h>
#endif

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <numeric>
#include <ostream>
#include <thread>
#include <utility>
#include <vector>

#include "types.hpp"

namespace hyrise {

#if HYRISE_NUMA_SUPPORT
const int Topology::_number_of_hardware_nodes = numa_num_configured_nodes();  // NOLINT
#else
const int Topology::_number_of_hardware_nodes = 1;  // NOLINT
#endif

Topology::Topology() {
  _init_default_topology();
}

std::ostream& operator<<(std::ostream& stream, const TopologyNode& topology_node) {
  const auto cpu_count = topology_node.cpus.size();
  stream << "Number of Node CPUs: " << cpu_count << ", CPUIDs: [";
  for (auto cpu_idx = size_t{0}; cpu_idx < cpu_count; ++cpu_idx) {
    stream << topology_node.cpus[cpu_idx].cpu_id;
    if (cpu_idx + 1 < cpu_count) {
      stream << ", ";
    }
  }
  stream << "]";

  return stream;
}

void Topology::use_default_topology(uint32_t max_num_cores) {
  _init_default_topology(max_num_cores);
}

void Topology::use_numa_topology(uint32_t max_num_cores) {
  _init_numa_topology(max_num_cores);
}

void Topology::use_non_numa_topology(uint32_t max_num_cores) {
  _init_non_numa_topology(max_num_cores);
}

void Topology::use_fake_numa_topology(uint32_t max_num_workers, uint32_t workers_per_node) {
  auto num_workers = std::thread::hardware_concurrency();
  if (max_num_workers != 0) {
    num_workers = std::min<uint32_t>(num_workers, max_num_workers);
  }

  auto num_nodes = num_workers / workers_per_node;
  if (num_workers % workers_per_node != 0) {
    ++num_nodes;
  }

  auto worker_count_per_node = std::vector<uint32_t>{};
  for (auto node_id = uint32_t{0}; node_id < num_nodes; ++node_id) {
    worker_count_per_node.emplace_back(std::min(num_workers, workers_per_node));
    num_workers -= workers_per_node;
  }

  _init_fake_numa_topology(worker_count_per_node);
}

void Topology::use_fake_numa_topology(const std::vector<uint32_t>& workers_per_node) {
  _init_fake_numa_topology(workers_per_node);
}

void Topology::_init_default_topology(uint32_t max_num_cores) {
#if !HYRISE_NUMA_SUPPORT
  _init_non_numa_topology(max_num_cores);
#else
  _init_numa_topology(max_num_cores);
#endif
}

void Topology::_init_numa_topology(uint32_t max_num_cores) {
#if !HYRISE_NUMA_SUPPORT
  use_fake_numa_topology(max_num_cores);
#else

  if (numa_available() < 0) {
    use_fake_numa_topology(max_num_cores);
    return;
  }

  _clear();
  _fake_numa_topology = false;

  auto max_node = numa_max_node();
  auto num_configured_cpus = static_cast<CpuID>(numa_num_configured_cpus());

  // We take the CPU affinity (set, e.g., by numactl) of our process into account.
  // Otherwise, we would always start with the first CPU, even if a specific NUMA node was selected.
  auto* affinity_cpu_bitmask = numa_allocate_cpumask();
  numa_sched_getaffinity(0, affinity_cpu_bitmask);

  auto* this_node_cpu_bitmask = numa_allocate_cpumask();
  auto core_count = uint32_t{0};

  for (auto node_id = 0; node_id <= max_node; ++node_id) {
    if (max_num_cores == 0 || core_count < max_num_cores) {
      auto cpus = std::vector<TopologyCpu>();

      numa_node_to_cpus(node_id, this_node_cpu_bitmask);

      for (auto cpu_id = CpuID{0}; cpu_id < num_configured_cpus; ++cpu_id) {
        const auto cpu_is_part_of_node = numa_bitmask_isbitset(this_node_cpu_bitmask, cpu_id);
        const auto cpu_is_part_of_affinity = numa_bitmask_isbitset(affinity_cpu_bitmask, cpu_id);
        if (cpu_is_part_of_node != 0 && cpu_is_part_of_affinity != 0) {
          if (max_num_cores == 0 || core_count < max_num_cores) {
            cpus.emplace_back(cpu_id);
            ++_num_cpus;
          }
          ++core_count;
        }
        if (cpu_is_part_of_affinity == 0) {
          _filtered_by_affinity = true;
        }
      }

      auto node = TopologyNode(std::move(cpus));
      _nodes.emplace_back(std::move(node));
    }
  }

  numa_free_cpumask(affinity_cpu_bitmask);
  numa_free_cpumask(this_node_cpu_bitmask);
#endif
}

void Topology::_init_non_numa_topology(uint32_t max_num_cores) {
  _clear();
  _fake_numa_topology = false;

  _num_cpus = std::thread::hardware_concurrency();
  if (max_num_cores != 0) {
    _num_cpus = std::min(_num_cpus, max_num_cores);
  }

  auto cpus = std::vector<TopologyCpu>{};

  for (auto cpu_id = CpuID{0}; cpu_id < _num_cpus; ++cpu_id) {
    cpus.emplace_back(cpu_id);
  }

  auto node = TopologyNode(std::move(cpus));
  _nodes.emplace_back(std::move(node));
}

void Topology::_init_fake_numa_topology(const std::vector<uint32_t>& workers_per_node) {
  _clear();
  _fake_numa_topology = true;

  auto cpu_id = CpuID{0};
  for (const auto& node_worker_count : workers_per_node) {
    auto cpus = std::vector<TopologyCpu>();

    for (auto worker_id = uint32_t{0}; worker_id < node_worker_count; ++worker_id) {
      cpus.emplace_back(cpu_id);
      ++cpu_id;
    }

    _nodes.emplace_back(std::move(cpus));
  }

  _num_cpus = std::reduce(workers_per_node.cbegin(), workers_per_node.cend());
}

const std::vector<TopologyNode>& Topology::nodes() const {
  return _nodes;
}

size_t Topology::num_cpus() const {
  return _num_cpus;
}

void Topology::_clear() {
  _nodes.clear();
  _num_cpus = 0;
}

std::ostream& operator<<(std::ostream& stream, const Topology& topology) {
  stream << "Number of CPUs: " << topology.num_cpus() << '\n';
  if (topology._filtered_by_affinity) {
    stream << "Available CPUs / nodes were filtered by externally set CPU affinity (e.g., numactl).\n";
  }
  for (auto node_idx = size_t{0}; node_idx < topology.nodes().size(); ++node_idx) {
    stream << "Node #" << node_idx << " - ";
    stream << topology.nodes()[node_idx] << '\n';
  }

  return stream;
}

}  // namespace hyrise
